/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.index;

import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.KnnFloatVectorField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.CannedTokenStream;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.index.SuppressingConcurrentMergeScheduler;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SameThreadExecutorService;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.Version;

public class TestConcurrentMergeScheduler extends LuceneTestCase {

  private class FailOnlyOnFlush extends MockDirectoryWrapper.Failure {
    boolean doFail;
    boolean hitExc;

    @Override
    public void setDoFail() {
      this.doFail = true;
      hitExc = false;
    }

    @Override
    public void clearDoFail() {
      this.doFail = false;
    }

    @Override
    public void eval(MockDirectoryWrapper dir) throws IOException {
      if (doFail && isTestThread()) {
        if (callStackContainsAnyOf("flush")
            && false == callStackContainsAnyOf("close")
            && random().nextBoolean()) {
          hitExc = true;
          throw new IOException(Thread.currentThread().getName() + ": now failing during flush");
        }
      }
    }
  }

  // Make sure running BG merges still work fine even when
  // we are hitting exceptions during flushing.
  public void testFlushExceptions() throws IOException {
    MockDirectoryWrapper directory = newMockDirectory();
    FailOnlyOnFlush failure = new FailOnlyOnFlush();
    directory.failOn(failure);
    IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())).setMaxBufferedDocs(2);
    if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
      iwc.setMergeScheduler(
          new SuppressingConcurrentMergeScheduler() {
            @Override
            protected boolean isOK(Throwable th) {
              return th instanceof AlreadyClosedException
                  || (th instanceof IllegalStateException
                      && th.getMessage().contains("this writer hit an unrecoverable error"));
            }

            @Override
            // override here to ensure even tiny merges get the parallel executor
            public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
              assert intraMergeExecutor != null : "intraMergeExecutor is not initialized";
              return intraMergeExecutor;
            }
          });
    }
    IndexWriter writer = new IndexWriter(directory, iwc);
    Document doc = new Document();
    Field idField = newStringField("id", "", Field.Store.YES);
    KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
    doc.add(idField);
    // Add knn float vectors to test parallel merge
    doc.add(knnField);

    outer:
    for (int i = 0; i < 10; i++) {
      if (VERBOSE) {
        System.out.println("TEST: iter=" + i);
      }

      for (int j = 0; j < 20; j++) {
        idField.setStringValue(Integer.toString(i * 20 + j));
        knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
        writer.addDocument(doc);
      }

      // must cycle here because sometimes the merge flushes
      // the doc we just added and so there's nothing to
      // flush, and we don't hit the exception
      while (true) {
        writer.addDocument(doc);
        failure.setDoFail();
        try {
          writer.flush(true, true);
          if (failure.hitExc) {
            fail("failed to hit IOException");
          }
        } catch (IOException ioe) {
          if (VERBOSE) {
            ioe.printStackTrace(System.out);
          }
          failure.clearDoFail();
          // make sure we are closed or closing - if we are unlucky a merge does
          // the actual closing for us. this is rare but might happen since the
          // tragicEvent is checked by IFD and that might throw during a merge
          expectThrows(AlreadyClosedException.class, writer::ensureOpen);
          // Abort should have closed the deleter:
          assertTrue(writer.isDeleterClosed());
          writer.close(); // now wait for the close to actually happen if a merge thread did the
          // close.
          break outer;
        }
      }
    }

    assertFalse(DirectoryReader.indexExists(directory));
    directory.close();
  }

  // Test that deletes committed after a merge started and
  // before it finishes, are correctly merged back:
  public void testDeleteMerging() throws IOException {
    Directory directory = newDirectory();

    LogDocMergePolicy mp = new LogDocMergePolicy();
    // Force degenerate merging so we can get a mix of
    // merging of segments with and without deletes at the
    // start:
    mp.setMinMergeDocs(1000);
    IndexWriter writer =
        new IndexWriter(
            directory, newIndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(mp));
    TestUtil.reduceOpenFiles(writer);

    Document doc = new Document();
    Field idField = newStringField("id", "", Field.Store.YES);
    doc.add(idField);
    for (int i = 0; i < 10; i++) {
      if (VERBOSE) {
        System.out.println("\nTEST: cycle");
      }
      for (int j = 0; j < 100; j++) {
        idField.setStringValue(Integer.toString(i * 100 + j));
        writer.addDocument(doc);
      }

      int delID = i;
      while (delID < 100 * (1 + i)) {
        if (VERBOSE) {
          System.out.println("TEST: del " + delID);
        }
        writer.deleteDocuments(new Term("id", "" + delID));
        delID += 10;
      }

      writer.commit();
    }

    writer.close();
    IndexReader reader = DirectoryReader.open(directory);
    // Verify that we did not lose any deletes...
    assertEquals(450, reader.numDocs());
    reader.close();
    directory.close();
  }

  public void testNoExtraFiles() throws IOException {
    Directory directory = newDirectory();
    IndexWriter writer =
        new IndexWriter(
            directory, newIndexWriterConfig(new MockAnalyzer(random())).setMaxBufferedDocs(2));

    for (int iter = 0; iter < 7; iter++) {
      if (VERBOSE) {
        System.out.println("TEST: iter=" + iter);
      }

      for (int j = 0; j < 21; j++) {
        Document doc = new Document();
        doc.add(newTextField("content", "a b c", Field.Store.NO));
        writer.addDocument(doc);
      }

      writer.close();
      TestIndexWriter.assertNoUnreferencedFiles(directory, "testNoExtraFiles");

      // Reopen
      writer =
          new IndexWriter(
              directory,
              newIndexWriterConfig(new MockAnalyzer(random()))
                  .setOpenMode(OpenMode.APPEND)
                  .setMaxBufferedDocs(2));
    }

    writer.close();

    directory.close();
  }

  public void testNoWaitClose() throws IOException {
    Directory directory = newDirectory();
    Document doc = new Document();
    Field idField = newStringField("id", "", Field.Store.YES);
    KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f});
    doc.add(idField);
    doc.add(knnField);
    IndexWriterConfig iwc =
        newIndexWriterConfig(new MockAnalyzer(random()))
            // Force excessive merging:
            .setMaxBufferedDocs(2)
            .setMergePolicy(newLogMergePolicy(100))
            .setCommitOnClose(false);
    if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
      iwc.setMergeScheduler(
          new ConcurrentMergeScheduler() {
            @Override
            // override here to ensure even tiny merges get the parallel executor
            public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
              assert intraMergeExecutor != null : "scaledExecutor is not initialized";
              return intraMergeExecutor;
            }
          });
    }

    IndexWriter writer = new IndexWriter(directory, iwc);

    int numIters = TEST_NIGHTLY ? 10 : 3;
    for (int iter = 0; iter < numIters; iter++) {

      for (int j = 0; j < 201; j++) {
        idField.setStringValue(Integer.toString(iter * 201 + j));
        knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()});
        writer.addDocument(doc);
      }

      int delID = iter * 201;
      for (int j = 0; j < 20; j++) {
        writer.deleteDocuments(new Term("id", Integer.toString(delID)));
        delID += 5;
      }

      // Force a bunch of merge threads to kick off so we
      // stress out aborting them on close:
      ((LogMergePolicy) writer.getConfig().getMergePolicy()).setMergeFactor(3);
      writer.addDocument(doc);

      try {
        writer.commit();
      } finally {
        writer.close();
      }

      IndexReader reader = DirectoryReader.open(directory);
      assertEquals((1 + iter) * 182, reader.numDocs());
      reader.close();

      // Reopen
      writer =
          new IndexWriter(
              directory,
              newIndexWriterConfig(new MockAnalyzer(random()))
                  .setOpenMode(OpenMode.APPEND)
                  .setMergePolicy(newLogMergePolicy(100))
                  .
                  // Force excessive merging:
                  setMaxBufferedDocs(2)
                  .setCommitOnClose(false));
    }
    writer.close();

    directory.close();
  }

  // LUCENE-4544
  @SuppressForbidden(reason = "Thread sleep")
  public void testMaxMergeCount() throws Exception {
    Directory dir = newDirectory();
    IndexWriterConfig iwc =
        new IndexWriterConfig(new MockAnalyzer(random())).setCommitOnClose(false);

    final int maxMergeCount = TestUtil.nextInt(random(), 1, 5);
    final int maxMergeThreads = TestUtil.nextInt(random(), 1, maxMergeCount);
    final CountDownLatch enoughMergesWaiting = new CountDownLatch(maxMergeCount);
    final AtomicInteger runningMergeCount = new AtomicInteger(0);
    final AtomicBoolean failed = new AtomicBoolean();

    if (VERBOSE) {
      System.out.println(
          "TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" + maxMergeThreads);
    }

    ConcurrentMergeScheduler cms =
        new ConcurrentMergeScheduler() {
          @SuppressForbidden(reason = "Thread sleep")
          @Override
          protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
              throws IOException {
            try {
              // Stall all incoming merges until we see
              // maxMergeCount:
              int count = runningMergeCount.incrementAndGet();
              try {
                assertTrue(
                    "count=" + count + " vs maxMergeCount=" + maxMergeCount,
                    count <= maxMergeCount);
                enoughMergesWaiting.countDown();

                // Stall this merge until we see exactly
                // maxMergeCount merges waiting
                while (true) {
                  if (enoughMergesWaiting.await(10, TimeUnit.MILLISECONDS) || failed.get()) {
                    break;
                  }
                }
                // Then sleep a bit to give a chance for the bug
                // (too many pending merges) to appear:
                Thread.sleep(20);
                super.doMerge(mergeSource, merge);
              } finally {
                runningMergeCount.decrementAndGet();
              }
            } catch (Throwable t) {
              failed.set(true);
              mergeSource.onMergeFinished(merge);
              throw new RuntimeException(t);
            }
          }
        };
    cms.setMaxMergesAndThreads(maxMergeCount, maxMergeThreads);
    iwc.setMergeScheduler(cms);
    iwc.setMaxBufferedDocs(2);

    TieredMergePolicy tmp = new TieredMergePolicy();
    iwc.setMergePolicy(tmp);
    tmp.setSegmentsPerTier(2);

    IndexWriter w = new IndexWriter(dir, iwc);
    Document doc = new Document();
    doc.add(newField("field", "field", TextField.TYPE_NOT_STORED));
    while (enoughMergesWaiting.getCount() != 0 && !failed.get()) {
      for (int i = 0; i < 10; i++) {
        w.addDocument(doc);
      }
    }
    try {
      w.commit();
    } finally {
      w.close();
    }
    dir.close();
  }

  public void testSmallMergesDonNotGetThreads() throws IOException {
    Directory dir = newDirectory();
    IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
    iwc.setMaxBufferedDocs(2);
    iwc.setMergeScheduler(
        new ConcurrentMergeScheduler() {
          @Override
          protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
              throws IOException {
            assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService);
            super.doMerge(mergeSource, merge);
          }
        });
    IndexWriter w = new IndexWriter(dir, iwc);
    for (int i = 0; i < 10; i++) {
      Document doc = new Document();
      doc.add(new StringField("id", "" + i, Field.Store.NO));
      w.addDocument(doc);
    }
    w.forceMerge(1);
    w.close();
    dir.close();
  }

  public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException {
    ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
    MergeScheduler.MergeSource mergeSource =
        new MergeScheduler.MergeSource() {
          @Override
          public MergePolicy.OneMerge getNextMerge() {
            fail("should not be called");
            return null;
          }

          @Override
          public void onMergeFinished(MergePolicy.OneMerge merge) {
            fail("should not be called");
          }

          @Override
          public boolean hasPendingMerges() {
            fail("should not be called");
            return false;
          }

          @Override
          public void merge(MergePolicy.OneMerge merge) throws IOException {
            fail("should not be called");
          }
        };
    try (Directory dir = newDirectory();
        mergeScheduler) {
      MergePolicy.OneMerge merge =
          new MergePolicy.OneMerge(
              List.of(
                  new SegmentCommitInfo(
                      new SegmentInfo(
                          dir,
                          Version.LATEST,
                          null,
                          "test",
                          0,
                          false,
                          false,
                          Codec.getDefault(),
                          Collections.emptyMap(),
                          StringHelper.randomId(),
                          new HashMap<>(),
                          null),
                      0,
                      0,
                      0,
                      0,
                      0,
                      new byte[16])));
      mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir);
      mergeScheduler.setMaxMergesAndThreads(6, 6);
      Executor executor = mergeScheduler.intraMergeExecutor;
      AtomicInteger threadsExecutedOnPool = new AtomicInteger();
      AtomicInteger threadsExecutedOnSelf = new AtomicInteger();
      CountDownLatch latch = new CountDownLatch(1);
      final int totalThreads = 4;
      for (int i = 0; i < totalThreads; i++) {
        mergeScheduler.mergeThreads.add(
            mergeScheduler.new MergeThread(mergeSource, merge) {
              @Override
              public void run() {
                executor.execute(
                    () -> {
                      if (Thread.currentThread() == this) {
                        threadsExecutedOnSelf.incrementAndGet();
                      } else {
                        threadsExecutedOnPool.incrementAndGet();
                      }
                      try {
                        latch.await();
                      } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                      }
                    });
              }
            });
      }
      for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) {
        thread.start();
      }
      while (threadsExecutedOnSelf.get() + threadsExecutedOnPool.get() < totalThreads) {
        Thread.yield();
      }
      latch.countDown();
      mergeScheduler.sync();
      assertEquals(3, threadsExecutedOnSelf.get());
      assertEquals(1, threadsExecutedOnPool.get());
    }
  }

  private static class TrackingCMS extends ConcurrentMergeScheduler {
    long totMergedBytes;
    CountDownLatch atLeastOneMerge;

    public TrackingCMS(CountDownLatch atLeastOneMerge) {
      setMaxMergesAndThreads(5, 5);
      this.atLeastOneMerge = atLeastOneMerge;
    }

    @Override
    public void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
      totMergedBytes += merge.totalBytesSize();
      atLeastOneMerge.countDown();
      super.doMerge(mergeSource, merge);
    }
  }

  public void testTotalBytesSize() throws Exception {
    Directory d = newDirectory();
    if (d instanceof MockDirectoryWrapper) {
      ((MockDirectoryWrapper) d).setThrottling(MockDirectoryWrapper.Throttling.NEVER);
    }
    IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
    iwc.setMaxBufferedDocs(5);
    CountDownLatch atLeastOneMerge = new CountDownLatch(1);
    iwc.setMergeScheduler(new TrackingCMS(atLeastOneMerge));
    if (TestUtil.getPostingsFormat("id").equals("SimpleText")) {
      // no
      iwc.setCodec(TestUtil.alwaysPostingsFormat(TestUtil.getDefaultPostingsFormat()));
    }
    IndexWriter w = new IndexWriter(d, iwc);
    for (int i = 0; i < 1000; i++) {
      Document doc = new Document();
      doc.add(new StringField("id", "" + i, Field.Store.NO));
      w.addDocument(doc);

      if (random().nextBoolean()) {
        w.deleteDocuments(new Term("id", "" + random().nextInt(i + 1)));
      }
    }
    atLeastOneMerge.await();
    assertTrue(((TrackingCMS) w.getConfig().getMergeScheduler()).totMergedBytes != 0);
    w.close();
    d.close();
  }

  public void testInvalidMaxMergeCountAndThreads() throws Exception {
    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
    expectThrows(
        IllegalArgumentException.class,
        () ->
            cms.setMaxMergesAndThreads(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, 3));
    expectThrows(
        IllegalArgumentException.class,
        () ->
            cms.setMaxMergesAndThreads(3, ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS));
  }

  public void testLiveMaxMergeCount() throws Exception {
    Directory d = newDirectory();
    IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
    iwc.setMergePolicy(
        new MergePolicy() {

          @Override
          public MergeSpecification findMerges(
              MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
              throws IOException {
            // no natural merges
            return null;
          }

          @Override
          public MergeSpecification findForcedDeletesMerges(
              SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
            // not needed
            return null;
          }

          @Override
          public MergeSpecification findForcedMerges(
              SegmentInfos segmentInfos,
              int maxSegmentCount,
              Map<SegmentCommitInfo, Boolean> segmentsToMerge,
              MergeContext mergeContext)
              throws IOException {
            // The test is about testing that CMS bounds the number of merging threads, so we just
            // return many merges.
            MergeSpecification spec = new MergeSpecification();
            List<SegmentCommitInfo> oneMerge = new ArrayList<>();
            for (SegmentCommitInfo sci : segmentsToMerge.keySet()) {
              oneMerge.add(sci);
              if (oneMerge.size() >= 10) {
                spec.add(new OneMerge(new ArrayList<>(oneMerge)));
                oneMerge.clear();
              }
            }
            return spec;
          }
        });
    iwc.setMaxBufferedDocs(2);
    iwc.setRAMBufferSizeMB(-1);

    final AtomicInteger maxRunningMergeCount = new AtomicInteger();

    ConcurrentMergeScheduler cms =
        new ConcurrentMergeScheduler() {

          final AtomicInteger runningMergeCount = new AtomicInteger();

          @Override
          public void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
              throws IOException {
            int count = runningMergeCount.incrementAndGet();
            // evil?
            synchronized (this) {
              if (count > maxRunningMergeCount.get()) {
                maxRunningMergeCount.set(count);
              }
            }
            try {
              super.doMerge(mergeSource, merge);
            } finally {
              runningMergeCount.decrementAndGet();
            }
          }
        };

    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxMergeCount());
    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxThreadCount());

    cms.setMaxMergesAndThreads(5, 3);

    iwc.setMergeScheduler(cms);

    IndexWriter w = new IndexWriter(d, iwc);
    // Makes 100 segments
    for (int i = 0; i < 200; i++) {
      w.addDocument(new Document());
    }

    // No merges should have run so far, because TMP has high segmentsPerTier:
    assertEquals(0, maxRunningMergeCount.get());
    w.forceMerge(1);

    // At most 5 merge threads should have launched at once:
    assertTrue("maxRunningMergeCount=" + maxRunningMergeCount, maxRunningMergeCount.get() <= 5);
    maxRunningMergeCount.set(0);

    // Makes another 100 segments
    for (int i = 0; i < 200; i++) {
      w.addDocument(new Document());
    }

    ((ConcurrentMergeScheduler) w.getConfig().getMergeScheduler()).setMaxMergesAndThreads(1, 1);
    w.forceMerge(1);

    // At most 1 merge thread should have launched at once:
    assertEquals(1, maxRunningMergeCount.get());

    w.close();
    d.close();
  }

  // LUCENE-6063
  public void testMaybeStallCalled() throws Exception {
    final AtomicBoolean wasCalled = new AtomicBoolean();
    Directory dir = newDirectory();
    IndexWriterConfig iwc =
        newIndexWriterConfig(new MockAnalyzer(random()))
            .setMergePolicy(new LogByteSizeMergePolicy());
    iwc.setMergeScheduler(
        new ConcurrentMergeScheduler() {
          @Override
          protected boolean maybeStall(MergeSource mergeSource) {
            wasCalled.set(true);
            return true;
          }
        });
    IndexWriter w = new IndexWriter(dir, iwc);
    w.addDocument(new Document());
    w.flush();
    w.addDocument(new Document());
    w.forceMerge(1);
    assertTrue(wasCalled.get());
    w.close();
    dir.close();
  }

  // LUCENE-6094
  @SuppressForbidden(reason = "Thread sleep")
  public void testHangDuringRollback() throws Throwable {
    Directory dir = newMockDirectory();
    IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
    iwc.setMaxBufferedDocs(2);
    LogDocMergePolicy mp = new LogDocMergePolicy();
    iwc.setMergePolicy(mp);
    mp.setMergeFactor(2);
    final CountDownLatch mergeStart = new CountDownLatch(1);
    final CountDownLatch mergeFinish = new CountDownLatch(1);
    ConcurrentMergeScheduler cms =
        new ConcurrentMergeScheduler() {
          @Override
          protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
              throws IOException {
            mergeStart.countDown();
            try {
              mergeFinish.await();
            } catch (InterruptedException ie) {
              throw new RuntimeException(ie);
            }
            super.doMerge(mergeSource, merge);
          }
        };
    cms.setMaxMergesAndThreads(1, 1);
    iwc.setMergeScheduler(cms);

    final IndexWriter w = new IndexWriter(dir, iwc);

    w.addDocument(new Document());
    w.addDocument(new Document());
    // flush

    w.addDocument(new Document());
    w.addDocument(new Document());
    // flush + merge

    // Wait for merge to kick off
    mergeStart.await();

    new Thread() {
      @Override
      public void run() {
        try {
          w.addDocument(new Document());
          w.addDocument(new Document());
          // flush

          w.addDocument(new Document());
          // W/o the fix for LUCENE-6094 we would hang forever here:
          w.addDocument(new Document());
          // flush + merge

          // Now allow first merge to finish:
          mergeFinish.countDown();

        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    }.start();

    while (w.getDocStats().numDocs != 8) {
      Thread.sleep(10);
    }

    w.rollback();
    dir.close();
  }

  // LUCENE-10118 : Verify the basic log output from MergeThreads
  public void testMergeThreadMessages() throws Exception {
    Directory dir = newDirectory();
    IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
    Set<Thread> mergeThreadSet = ConcurrentHashMap.newKeySet();
    ConcurrentMergeScheduler cms =
        new ConcurrentMergeScheduler() {
          @Override
          protected synchronized MergeThread getMergeThread(
              MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
            MergeThread newMergeThread = super.getMergeThread(mergeSource, merge);
            mergeThreadSet.add(newMergeThread);
            return newMergeThread;
          }
        };
    iwc.setMergeScheduler(cms);

    List<String> messages = Collections.synchronizedList(new ArrayList<>());
    iwc.setInfoStream(
        new InfoStream() {
          @Override
          public void close() {}

          @Override
          public void message(String component, String message) {
            if (component.equals("MS")) messages.add(message);
          }

          @Override
          public boolean isEnabled(String component) {
            return component.equals("MS");
          }
        });
    iwc.setMaxBufferedDocs(2);
    LogMergePolicy lmp = newLogMergePolicy();
    lmp.setMergeFactor(2);
    lmp.setTargetSearchConcurrency(1);
    iwc.setMergePolicy(lmp);

    IndexWriter w = new IndexWriter(dir, iwc);
    Document doc = new Document();
    doc.add(new TextField("foo", new CannedTokenStream()));
    w.addDocument(doc);
    w.addDocument(new Document());
    // flush
    w.addDocument(new Document());
    w.addDocument(new Document());
    // flush + merge
    w.close();
    dir.close();

    assertTrue(mergeThreadSet.size() > 0);
    for (Thread t : mergeThreadSet) {
      t.join();
    }
    for (Thread t : mergeThreadSet) {
      String name = t.getName();
      List<String> threadMsgs =
          messages.stream().filter(line -> line.startsWith("merge thread " + name)).toList();
      assertTrue(
          "Expected:·a·value·equal·to·or·greater·than·3,·got:"
              + threadMsgs.size()
              + ", threadMsgs="
              + threadMsgs,
          threadMsgs.size() >= 3);
      assertTrue(threadMsgs.get(0).startsWith("merge thread " + name + " start"));
      assertTrue(
          threadMsgs.stream()
              .anyMatch(line -> line.startsWith("merge thread " + name + " merge segment")));
      assertTrue(threadMsgs.get(threadMsgs.size() - 1).startsWith("merge thread " + name + " end"));
    }
  }

  public void testDynamicDefaults() throws Exception {
    Directory dir = newDirectory();
    IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxMergeCount());
    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxThreadCount());
    iwc.setMergeScheduler(cms);
    iwc.setMaxBufferedDocs(2);
    LogMergePolicy lmp = newLogMergePolicy();
    lmp.setMergeFactor(2);
    iwc.setMergePolicy(lmp);

    IndexWriter w = new IndexWriter(dir, iwc);
    w.addDocument(new Document());
    w.addDocument(new Document());
    // flush

    w.addDocument(new Document());
    w.addDocument(new Document());
    // flush + merge

    // CMS should have now set true values:
    assertTrue(cms.getMaxMergeCount() != ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS);
    assertTrue(cms.getMaxThreadCount() != ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS);
    w.close();
    dir.close();
  }

  public void testResetToAutoDefault() throws Exception {
    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxMergeCount());
    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxThreadCount());
    cms.setMaxMergesAndThreads(4, 3);
    assertEquals(4, cms.getMaxMergeCount());
    assertEquals(3, cms.getMaxThreadCount());

    expectThrows(
        IllegalArgumentException.class,
        () ->
            cms.setMaxMergesAndThreads(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, 4));

    expectThrows(
        IllegalArgumentException.class,
        () ->
            cms.setMaxMergesAndThreads(4, ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS));

    cms.setMaxMergesAndThreads(
        ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS,
        ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS);
    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxMergeCount());
    assertEquals(ConcurrentMergeScheduler.AUTO_DETECT_MERGES_AND_THREADS, cms.getMaxThreadCount());
  }

  public void testSpinningDefaults() throws Exception {
    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
    cms.setDefaultMaxMergesAndThreads(true);
    assertEquals(1, cms.getMaxThreadCount());
    assertEquals(6, cms.getMaxMergeCount());
  }

  public void testAutoIOThrottleGetter() throws Exception {
    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
    assertFalse(cms.getAutoIOThrottle());
    cms.enableAutoIOThrottle();
    assertTrue(cms.getAutoIOThrottle());
    cms.disableAutoIOThrottle();
    assertFalse(cms.getAutoIOThrottle());
  }

  public void testNonSpinningDefaults() throws Exception {
    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
    cms.setDefaultMaxMergesAndThreads(false);
    int threadCount = cms.getMaxThreadCount();
    assertTrue(threadCount >= 1);
    assertTrue(threadCount <= 4);
    assertEquals(5 + threadCount, cms.getMaxMergeCount());
  }

  // LUCENE-6197
  public void testNoStallMergeThreads() throws Exception {
    MockDirectoryWrapper dir = newMockDirectory();

    IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
    iwc.setMergePolicy(NoMergePolicy.INSTANCE);
    iwc.setMaxBufferedDocs(2);
    iwc.setUseCompoundFile(true); // reduce open files
    IndexWriter w = new IndexWriter(dir, iwc);
    int numDocs = TEST_NIGHTLY ? 1000 : 100;
    for (int i = 0; i < numDocs; i++) {
      Document doc = new Document();
      doc.add(newStringField("field", "" + i, Field.Store.YES));
      w.addDocument(doc);
    }
    w.close();

    iwc = newIndexWriterConfig(new MockAnalyzer(random()));
    AtomicBoolean failed = new AtomicBoolean();
    ConcurrentMergeScheduler cms =
        new ConcurrentMergeScheduler() {
          @Override
          protected void doStall() {
            if (Thread.currentThread().getName().startsWith("Lucene Merge Thread")) {
              failed.set(true);
            }
            super.doStall();
          }
        };
    cms.enableAutoIOThrottle();
    cms.setMaxMergesAndThreads(2, 1);
    iwc.setMergeScheduler(cms);
    iwc.setMaxBufferedDocs(2);

    w = new IndexWriter(dir, iwc);
    w.forceMerge(1);
    w.close();
    dir.close();

    assertFalse(failed.get());
  }

  /*
   * This test tries to produce 2 merges running concurrently with 2 segments per merge. While these
   * merges run we kick off a forceMerge that puts a pending merge in the queue but waits for things to happen.
   * While we do this we reduce maxMergeCount to 1. If concurrency in CMS is not right the forceMerge will wait forever
   * since none of the currently running merges picks up the pending merge. This test fails every time.
   */
  public void testChangeMaxMergeCountyWhileForceMerge() throws IOException, InterruptedException {
    int numIters = TEST_NIGHTLY ? 100 : 10;
    for (int iters = 0; iters < numIters; iters++) {
      LogDocMergePolicy mp = new LogDocMergePolicy();
      mp.setMergeFactor(2);
      CountDownLatch forceMergeWaits = new CountDownLatch(1);
      CountDownLatch mergeThreadsStartAfterWait = new CountDownLatch(1);
      CountDownLatch mergeThreadsArrived = new CountDownLatch(2);
      InfoStream stream =
          new InfoStream() {
            @Override
            public void message(String component, String message) {
              if ("TP".equals(component) && "mergeMiddleStart".equals(message)) {
                mergeThreadsArrived.countDown();
                try {
                  mergeThreadsStartAfterWait.await();
                } catch (InterruptedException e) {
                  throw new AssertionError(e);
                }
              } else if ("TP".equals(component) && "forceMergeBeforeWait".equals(message)) {
                forceMergeWaits.countDown();
              }
            }

            @Override
            public boolean isEnabled(String component) {
              return "TP".equals(component);
            }

            @Override
            public void close() {}
          };
      try (Directory dir = newDirectory();
          IndexWriter writer =
              new IndexWriter(
                  dir,
                  new IndexWriterConfig()
                      .setMergeScheduler(new ConcurrentMergeScheduler())
                      .setMergePolicy(mp)
                      .setInfoStream(stream)) {
                @Override
                protected boolean isEnableTestPoints() {
                  return true;
                }
              }) {
        Thread t =
            new Thread(
                () -> {
                  try {
                    writer.forceMerge(1);
                  } catch (IOException e) {
                    throw new AssertionError(e);
                  }
                });
        ConcurrentMergeScheduler cms =
            (ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler();
        cms.setMaxMergesAndThreads(2, 2);
        try {
          for (int i = 0; i < 4; i++) {
            Document document = new Document();
            document.add(
                new TextField(
                    "foo", "the quick brown fox jumps over the lazy dog", Field.Store.YES));
            document.add(
                new TextField(
                    "bar",
                    RandomStrings.randomRealisticUnicodeOfLength(random(), 20),
                    Field.Store.YES));
            writer.addDocument(document);
            writer.flush();
          }
          assertEquals(writer.cloneSegmentInfos().toString(), 4, writer.getSegmentCount());
          mergeThreadsArrived.await();
          t.start();
          forceMergeWaits.await();
          cms.setMaxMergesAndThreads(1, 1);
        } finally {
          mergeThreadsStartAfterWait.countDown();
        }

        while (t.isAlive()) {
          t.join(10);
          if (cms.mergeThreadCount() == 0 && writer.hasPendingMerges()) {
            fail("writer has pending merges but no CMS threads are running");
          }
        }
        assertEquals(1, writer.getSegmentCount());
      }
    }
  }
}
